DartVM IsolateMessageHandler
介绍
在 Isolate 中,代码运行在消息队列中,IsolateMessageHandler 就是 Isolate 中的消息队列处理类。
IsolateMessageHandler 继承自 MessageHandler,要理解 IsolateMessageHandler,首先要理解它的父类 MessageHandler。
创建
IsolateMessageHandler 的创建位于 Isolate::InitIsolate 方法中,与 IsolateMessageHandler 相关的代码实现如下:
Isolate* result = new Isolate(isolate_group, api_flags);
// ...
MessageHandler* handler = new IsolateMessageHandler(result);
result->set_message_handler(handler);
result->set_main_port(PortMap::CreatePort(result->set_message_handler()));
// ...
HandleMesage 消息处理
MessageHandler::MessageStatus IsolateMessageHandler::HandleMessage(
std::unique_ptr<Message> message) {
Thread* thread = Thread::Current();
StackZone stack_zone(thread);
Zone* zone = stack_zone.GetZone();
HandleScope handle_scope(thread);
// 如果是普通消息,查找它的 handler,并处理了一些异常情况(省略)
Object& msg_handler = Object::Handle(zone);
if (!message->IsOOB() && (message->dest_port() != Message::kIllegalPort)) {
msg_handler = DartLibraryCalls::LookupHandler(message->dest_port());
//......
}
// 消息解析,省略消息处理过程,省略消息异常报错
Object& msg_obj = Object::Handle(zone);
if (message->IsPersistentHandle()) {
// msg_array = [<message>, <object-in-message-to-rehash>]
// ... 省略消息处理过程
} else {
msg_obj = ReadMessage(thread, message.get());
}
Instance& msg = Instance::Handle(zone);
msg ^= msg_obj.ptr(); // Can't use Instance::Cast because may be null.
MessageStatus status = kOK;
// 如果是 OOB 控制消息
if (message->IsOOB()) {
// OOB 消息都是固定长度的数组,首元素叫 Smi 描述 OOB 目的地
if (msg.IsArray()) {
const Array& oob_msg = Array::Cast(msg);
if (oob_msg.Length() > 0) {
const Object& oob_tag = Object::Handle(zone, oob_msg.At(0));
if (oob_tag.IsSmi()) {
// 走到这里,说明是一个有效的 OOB 消息
switch (Smi::Cast(oob_tag).Value()) {
case Message::kServiceOOBMsg: {
// 这种跟 Isolate 没有关系
UNREACHABLE();
break;
}
case Message::kIsolateLibOOBMsg: {
// 跟 Isolate 有关系的是这一种
const Error& error = Error::Handle(HandleLibMessage(oob_msg));
if (!error.IsNull()) {
status = ProcessUnhandledException(error);
}
break;
}
}
}
}
}
} else if (message->dest_port() == Message::kIllegalPort) {
// 在这里处理普通消息
// 这里还有一个对 OOB 消息的兜底处理,先省略
if (msg.IsArray()) {
// ......
}
} else {
// DartLibraryCalls::HandleMessage 是 Isolate 进行进一步消息分发的方法
// msg_handler 是前面根据 port id 查到的处理器
// msg 是解析出来的消息
const Object& result =
Object::Handle(zone, DartLibraryCalls::HandleMessage(msg_handler, msg));
if (result.IsError()) {
status = ProcessUnhandledException(Error::Cast(result));
} else {
ASSERT(result.IsNull());
}
}
return status;
}
DartLibraryCalls::LookupHandler
在上面方法中,DartLibraryCalls::LookupHandler 用来根据 id 获取对应的 MessageHandler。
DartLibraryCalls::LookupHandler 的实现如下:
ObjectPtr DartLibraryCalls::LookupHandler(Dart_Port port_id) {
Thread* thread = Thread::Current();
Zone* zone = thread->zone();
const auto& function = Function::Handle(
zone, thread->isolate_group()->object_store()->lookup_port_handler());
const int kNumArguments = 1;
ASSERT(!function.IsNull());
Array& args = Array::Handle(
zone, thread->isolate()->isolate_object_store()->dart_args_1());
if (args.IsNull()) {
args = Array::New(kNumArguments);
thread->isolate()->isolate_object_store()->set_dart_args_1(args);
}
args.SetAt(0, Integer::Handle(zone, Integer::New(port_id)));
const Object& result =
Object::Handle(zone, DartEntry::InvokeFunction(function, args));
return result.ptr();
}
实现有点奇怪,都是在用反射从 C 层操作 C++ 层的东西。
从 Integer::Handle(zone, Integer::New(port_id)) 可以看出,这是拿着 port_id 生成了一个 Dart 层的整数
isolate_group()->object_store()->lookup_port_handler() 通过一番查找,这个反射(InvokeFunction)最终调用的是 _RawReceivePortImpl(Dart) 的 _lookupHandler。访问的是 Dart 的 PortMap,这样取出的实际上是 Dart 的 ReceivePort,并返回给了 C 层。
DartLibraryCalls::HandleMessage
代码实现如下:
ObjectPtr DartLibraryCalls::HandleMessage(const Object& handler,
const Instance& message) {
auto thread = Thread::Current();
auto zone = thread->zone();
auto isolate = thread->isolate();
auto object_store = thread->isolate_group()->object_store();
const auto& function =
Function::Handle(zone, object_store->handle_message_function());
const int kNumArguments = 2;
ASSERT(!function.IsNull());
Array& args =
Array::Handle(zone, isolate->isolate_object_store()->dart_args_2());
if (args.IsNull()) {
args = Array::New(kNumArguments);
isolate->isolate_object_store()->set_dart_args_2(args);
}
args.SetAt(0, handler);
args.SetAt(1, message);
const Object& result =
Object::Handle(zone, DartEntry::InvokeFunction(function, args));
ASSERT(result.IsNull() || result.IsError());
return result.ptr();
}
通过一番查找,handle_message_function() 对应的是 Dart 侧的 sdk/lib/_internal/vm/lib/isolate_patch.dart 的 _handleMessage:
// Called from the VM to dispatch to the handler.
@pragma("vm:entry-point", "call")
static void _handleMessage(Function handler, var message) {
handler(message);
_runPendingImmediateCallback();
}
这里的 Handler 就是传入 Dart PortMap 的 Handler(Dart 实现的),对于 ReceivePort 来说,就是它实现的 StreamController 的 add 方法。